package cAmqp

import (
	"fmt"
	"sync"

	"github.com/gin-gonic/gin"
	"github.com/rabbitmq/amqp091-go"

	"gitee.com/csingo/cLog"
)

type AckType uint8

const (
	ACK AckType = iota
	NACK
	REQUEUE
	DROP
)

type ConsumeType uint8

const (
	ConsumeType_BasicConsume ConsumeType = iota
	ConsumeType_BasicGet
)

type ConnectionType string

const (
	ConnectionType_Producer ConnectionType = "producer"
	ConnectionType_Consumer ConnectionType = "consumer"
)

type MessageHandlerFunc func(ctx *gin.Context, delivery []amqp091.Delivery)
type ParamsHandlerFunc func(ctx *gin.Context, params amqp091.Table) error
type PauseHandlerFunc func(ctx *gin.Context, params amqp091.Table) bool
type BindHandlerFunc func(ctx *gin.Context, status bool, params amqp091.Table)

type ExchangeOption struct {
	Durable     bool          `json:"durable"`
	AutoDeleted bool          `json:"auto_deleted"`
	Internal    bool          `json:"internal"`
	NoWait      bool          `json:"no_wait"`
	Arguments   amqp091.Table `json:"arguments"`
}

type QueueOption struct {
	Durable     bool          `json:"durable"`
	AutoDeleted bool          `json:"auto_deleted"`
	Exclusive   bool          `json:"exclusive"`
	NoWait      bool          `json:"no_wait"`
	Arguments   amqp091.Table `json:"arguments"`
}

type BindOption struct {
	NoWait    bool          `json:"no_wait"`
	Arguments amqp091.Table `json:"arguments"`
}

type ProduceOption struct {
	// UUID       string             `json:"uuid"` // 唯一标识，用于判断是否独立链接
	DriverName string             `json:"driver_name"`
	DriverConf *AmqpConf_Driver   `json:"driver_conf"`
	Exchange   ExchangeOption     `json:"exchange"`  // 定义 exchange 初始化参数
	Queue      QueueOption        `json:"queue"`     // 定义 queue 初始化参数
	Bind       BindOption         `json:"bind"`      // 定义 bind 方法初始化参数
	OnlyPush   bool               `json:"only_push"` // 是否只推送消息，不定义queue
	Mandatory  bool               `json:"mandatory"`
	Immediate  bool               `json:"immediate"`
	Message    amqp091.Publishing `json:"message"`
}

type ConsumeOption struct {
	// UUID        string           `json:"uuid"`         // 唯一标识，用于判断是否独立链接
	DriverName  string           `json:"driver_name"`  // AmqpConf 中的 drivers 配置名
	DriverConf  *AmqpConf_Driver `json:"driver_conf"`  // 默认的 channel 配置，当次配置存在是优先使用此配置，否则取 AmqpConf 中的 channel 配置
	Exchange    ExchangeOption   `json:"exchange"`     // 定义 exchange 初始化参数
	Queue       QueueOption      `json:"queue"`        // 定义 queue 初始化参数
	Bind        BindOption       `json:"bind"`         // 定义 bind 方法初始化参数
	ConsumeType ConsumeType      `json:"consume_type"` // 是否使用逐条读取的方式进行消费
	BatchSize   int64            `json:"batch_size"`   // 批量消费数量
	WaitTime    int64            `json:"wait_time"`    // 等待消费间隔时间，队列取空后/判断为暂停时/读取异常时，开始计时
	WaitCount   int64            `json:"wait_count"`   // 允许队列取空次数（有积压时不算取空），默认 0 次，大于该值，则执行消费或销毁队列
	Tag         string           `json:"tag"`          // 消费参数
	AutoAck     bool             `json:"auto_ack"`     // 消费参数
	Exclusive   bool             `json:"exclusive"`    // 消费参数
	NoLocal     bool             `json:"no_local"`     // 消费参数
	NoWait      bool             `json:"no_wait"`      // 消费参数
	Arguments   amqp091.Table    `json:"arguments"`    // 消费参数
	Params      amqp091.Table    `json:"params"`       // 创建消费者时，自定义参数，用于各类 handler 处理

	Handler       MessageHandlerFunc `json:"-"` // 队列消息处理方法
	PauseHandler  PauseHandlerFunc   `json:"-"` // 判断队列是否暂停消费，return true 为暂停消费，return false 为继续消费
	StartHandler  ParamsHandlerFunc  `json:"-"` // 开始消费处理方法
	FinishHandler ParamsHandlerFunc  `json:"-"` // 结束消费处理方法
	BindHandler   BindHandlerFunc    `json:"-"` // bind 执行后处理方法
}

// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// connection
// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

type connection struct {
	name     string
	uri      string
	conn     *amqp091.Connection         `json:"-"`
	channels map[string]*amqp091.Channel `json:"-"`
	close    chan *amqp091.Error         `json:"-"`
	count    int64
	config   amqp091.Config `json:"-"`
}

func (c *connection) connect() {
	// defer func() {
	//	cLog.WithContext(nil, map[string]any{
	//		"source":     "cAmqp.connection.connect",
	//		"connection": c,
	//	}).Info("amqp客户端创建成功")
	// }()

	if c.conn != nil && !c.conn.IsClosed() {
		return
	}

	var err error
	for {
		c.count++
		c.conn, err = amqp091.DialConfig(c.uri, c.config)
		if err != nil {
			if c.count <= 1 {
				cLog.WithContext(nil, map[string]any{
					"source": "cAmqp.connection.connect",
					"name":   c.name,
					"uri":    c.uri,
					"err":    err.Error(),
				}).Panic("amqp客户端创建连接异常")
			} else {
				cLog.WithContext(nil, map[string]any{
					"source": "cAmqp.connection.connect",
					"name":   c.name,
					"uri":    c.uri,
					"err":    err.Error(),
				}).Error("amqp客户端创建连接异常")
			}
			continue
		}
		c.close = c.conn.NotifyClose(make(chan *amqp091.Error))

		go c.notify()

		return
	}
}

func (c *connection) notify() {
	var err *amqp091.Error
	select { //nolint:gosimple
	case err = <-c.close:
		if err != nil {
			cLog.WithContext(nil, map[string]any{
				"source": "cAmqp.connection.notify",
				"name":   c.name,
				"uri":    c.uri,
				"err":    err.Error(),
			}).Error("amqp客户端connection断开")
		}
	}
	if err != nil {
		c.connect()
	}
}

// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// container
// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

type AmqpContainer struct {
	lock        *sync.Mutex
	plock       *sync.Mutex
	connections map[string]*connection
	drivers     map[string]*AmqpConf_Driver
}

func (c *AmqpContainer) saveConnection(name, uri string, config amqp091.Config) {
	c.lock.Lock()
	defer c.lock.Unlock()

	var conn *connection
	var ok bool

	// id := fmt.Sprintf("%s:%s", typ, name)
	if conn, ok = c.connections[name]; !ok {
		conn = &connection{
			name:     name,
			uri:      uri,
			channels: map[string]*amqp091.Channel{},
			config:   config,
		}
		c.connections[name] = conn
	}

	conn.connect()

	cLog.WithContext(nil, map[string]any{
		"source": "cAmqp.AmqpContainer.saveUri",
		"name":   name,
		"uri":    uri,
	}).Trace("amqp客户端链接配置注入成功")
}

func (c *AmqpContainer) saveDriver(name string, d *AmqpConf_Driver) {
	c.lock.Lock()
	defer c.lock.Unlock()

	if _, ok := c.drivers[name]; ok || d == nil {
		return
	}
	c.drivers[name] = d
	cLog.WithContext(nil, map[string]any{
		"source": "cAmqp.AmqpContainer.saveDriver",
		"name":   name,
		"driver": d,
	}).Trace("amqp driver 注入成功")
}

func (c *AmqpContainer) channel(ctx *gin.Context, driverName string, driverConf *AmqpConf_Driver, typ ConnectionType) (name string, channel *amqp091.Channel, driver *AmqpConf_Driver, err error) {
	c.lock.Lock()
	defer c.lock.Unlock()

	if driverConf == nil && driverName == "" {
		err = AmqpConfDriverNotFoundErr
		return
	}

	var ok bool
	var conn *connection
	var connectionName string

	if driverConf != nil {
		driver = driverConf
	} else if _, ok = c.drivers[driverName]; ok {
		driver = c.drivers[driverName]
	} else {
		err = AmqpConfDriverNameNotFoundErr
		return
	}

	connectionName = driver.Connection
	if conn, ok = c.connections[connectionName]; !ok {
		err = AmqpConfConnectionNotFoundErr
		return
	}

	name = fmt.Sprintf("%s:%s", typ, driver)

	if channel, ok = conn.channels[name]; !ok || channel.IsClosed() {
		channel, err = conn.conn.Channel()
		if err != nil {
			return
		}
		conn.channels[name] = channel
	}

	return
}

func (c *AmqpContainer) close(ctx *gin.Context, conn, name string) {
	c.lock.Lock()
	defer c.lock.Unlock()

	if _, ok1 := c.connections[conn]; ok1 {
		if c.connections[conn].channels == nil {
			return
		}
		if _, ok2 := c.connections[conn].channels[name]; ok2 {
			c.connections[conn].channels[name].Close()
			delete(c.connections[conn].channels, name)
		}
	}
}

var self = &AmqpContainer{
	lock:  &sync.Mutex{},
	plock: &sync.Mutex{},
	// producer:    map[string]*sync.Mutex{},
	connections: map[string]*connection{},
	drivers:     map[string]*AmqpConf_Driver{},
}
